Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
68 commits
Select commit Hold shift + click to select a range
25bc89c
Refactor watcher operators so the base definitions can be reused by t…
tatiana Jan 5, 2026
7c96d1b
Fix unittests
tatiana Jan 5, 2026
a993b0d
Improve test coverage
tatiana Jan 5, 2026
f1a4e28
Introduce Cosmos Watcher K8s mode
tatiana Dec 12, 2025
0c4e67e
Create override class
tatiana Dec 16, 2025
ef4a20a
Add patched pod manager
tatiana Dec 16, 2025
e8c5ab9
Make event capture in execute_complete consistent
tatiana Dec 17, 2025
4e27db3
Use latest version of AF and k8s provider
tatiana Dec 17, 2025
8eedf4e
Attempt to make compatible with older versions of the K8s pod manager
tatiana Dec 17, 2025
aa0f1e6
Attempt to make compatible with older versions of the K8s pod manager
tatiana Dec 17, 2025
77a27ce
Try to fix integration tests
tatiana Dec 17, 2025
740abea
Fix K8s provider version check
tatiana Dec 18, 2025
0ab7576
Revert change to example k8s dag
tatiana Dec 18, 2025
43c33ca
Improve K8s integration tests
tatiana Dec 18, 2025
86db5f3
Fix running k8s watcher mode
tatiana Dec 18, 2025
2515e45
Fix running k8s watcher mode
tatiana Dec 18, 2025
7e9e638
Try to fix CI issue
tatiana Dec 18, 2025
e035916
Try to fix CI issue
tatiana Dec 18, 2025
73f87ee
Fix version of protobuf so is compatible between Airflow and dbt in K…
tatiana Dec 18, 2025
1298fcc
Add missing DAG
tatiana Dec 18, 2025
cf9f487
Do not run the k8s dag when it's not desired
tatiana Dec 18, 2025
67e170c
Release 1.13.0a1
tatiana Dec 18, 2025
c924e53
Release 1.13.0a1
tatiana Dec 18, 2025
b625a24
Improve log message when trying to poke XCom
tatiana Dec 29, 2025
2e9964f
🎨 [pre-commit.ci] Auto format from pre-commit.com hooks
pre-commit-ci[bot] Dec 29, 2025
bcb07b1
Fix path to triggerer
tatiana Dec 29, 2025
846962d
Improve watcher code coverage
tatiana Dec 30, 2025
239f022
Adjustments after rebase
tatiana Dec 30, 2025
52ae820
Clean up tests
tatiana Dec 30, 2025
7ab87b8
Fix import path
tatiana Dec 30, 2025
fb0cf72
Address feedback from @johnhoran in https://github.com/astronomer/ast…
tatiana Dec 30, 2025
ef7bde1
Replace Cosmos K8s Pod operator fix by the one proposed by @johnhoran…
tatiana Dec 31, 2025
5a6012d
Improve test coverage
tatiana Dec 31, 2025
224434c
Adjust watcher k8s to use base class
tatiana Jan 5, 2026
5ccb1bf
Fix logging
tatiana Jan 5, 2026
d407ae2
Fix unittests after rebase based on
tatiana Jan 6, 2026
9fdf602
Fix type error
tatiana Jan 6, 2026
4ed2bf6
Fix import error
tatiana Jan 6, 2026
db2034b
Tell codecov to ignore file that is from Airlfow code-base
tatiana Jan 6, 2026
8e3d400
Improve test coverage of cosmos/operators/_watcher/base.py
tatiana Jan 6, 2026
37f1372
Improve test coverage of AFTER_ALL with KUBERNETES_WATCHER
tatiana Jan 6, 2026
072bbaa
Add tests for DbtBuildWatcherKubernetesOperator
tatiana Jan 6, 2026
57a07bf
Add test for validating the K8s watcher consumer behaves as a sensor …
tatiana Jan 6, 2026
544643a
Reduce create_test_task_metadata function complexity
tatiana Jan 6, 2026
13fd0d2
Fix test name
tatiana Jan 6, 2026
e9a94d3
Add tests related to watcher k8s retries
tatiana Jan 6, 2026
989e909
Address PR feedback
tatiana Jan 6, 2026
62d87bc
Address PR feedback https://github.com/astronomer/astronomer-cosmos/p…
tatiana Jan 6, 2026
b5bfce0
Apply suggestion from @Copilot
tatiana Jan 6, 2026
a6f3fce
Revert log message
tatiana Jan 6, 2026
ba7f0dc
Address copilot feedback
tatiana Jan 6, 2026
19903ea
Update dev/dags/jaffle_shop_watcher_kubernetes.py
tatiana Jan 6, 2026
784bccd
Update tests/operators/test_watcher_kubernetes.py
tatiana Jan 6, 2026
77f44db
Merge branch 'main' into k8s-watcher-mess
tatiana Jan 6, 2026
7bfa62f
🎨 [pre-commit.ci] Auto format from pre-commit.com hooks
pre-commit-ci[bot] Jan 6, 2026
a56f8d4
Fix tests after rebase
tatiana Jan 6, 2026
ad41f97
Fix tests with old function name
tatiana Jan 6, 2026
a6a8c1b
Revert unnecessary changes to jaffle_shop_kubernetes DAG
tatiana Jan 6, 2026
48baa8b
Fix after rebase
tatiana Jan 6, 2026
176b0c1
Update tests/test_example_k8s_dags.py
tatiana Jan 6, 2026
e7b7a20
Update cosmos/airflow/_override.py
tatiana Jan 6, 2026
17de369
Update cosmos/operators/_watcher/triggerer.py
tatiana Jan 6, 2026
495bafc
Rename _use_event to use_event
tatiana Jan 7, 2026
5ec7e17
Revert from AF 3.1 to 3.0
tatiana Jan 7, 2026
025552a
Merge branch 'main' into k8s-watcher-mess
tatiana Jan 7, 2026
b06bacc
Fix running kubernetes watcher unit tests
tatiana Jan 7, 2026
c6c156e
Fix running k8s watcher unit tests
tatiana Jan 7, 2026
1f18858
Update .github/workflows/test.yml
tatiana Jan 7, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions codecov.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
ignore:
- "cosmos/airflow/_override.py"
3 changes: 2 additions & 1 deletion cosmos/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@

from cosmos import settings

__version__ = "1.12.1a1"
__version__ = "1.13.0a1"


if not settings.enable_memory_optimised_imports:
from cosmos.airflow.dag import DbtDag
Expand Down
193 changes: 193 additions & 0 deletions cosmos/airflow/_override.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
import math
import time
from collections.abc import Callable
from datetime import timedelta

import pendulum
from airflow.providers.cncf.kubernetes import __version__ as airflow_k8s_provider_version
Comment thread
tatiana marked this conversation as resolved.
from airflow.providers.cncf.kubernetes.callbacks import ExecutionMode
from airflow.providers.cncf.kubernetes.utils.pod_manager import PodLoggingStatus, PodManager
from airflow.utils.timezone import utcnow
from kubernetes.client.models.v1_pod import V1Pod
from packaging.version import Version
from pendulum import DateTime
from urllib3.exceptions import HTTPError, TimeoutError

from cosmos.constants import _K8s_WATCHER_MIN_K8S_PROVIDER_VERSION


# This is being added to overcome the issue with the KubernetesPodOperator logs repeating:
# https://github.com/apache/airflow/issues/59366
# It can be removed once it is fixed in the upstream provider.
class CosmosKubernetesPodManager(PodManager): # type: ignore[misc]
"""Create, monitor, and otherwise interact with Kubernetes pods for use with the KubernetesPodOperator."""

def fetch_container_logs( # noqa: C901
self,
pod: V1Pod,
container_name: str,
*,
follow: bool = False,
since_time: DateTime | None = None,
post_termination_timeout: int = 120,
container_name_log_prefix_enabled: bool = True,
log_formatter: Callable[[str, str], str] | None = None,
) -> PodLoggingStatus:
"""
Follow the logs of container and stream to airflow logging.

Returns when container exits.

Between when the pod starts and logs being available, there might be a delay due to CSR not approved
and signed yet. In such situation, ApiException is thrown. This is why we are retrying on this
specific exception.

:meta private:
"""

def consume_logs( # noqa: C901
*, since_time: DateTime | None = None
) -> tuple[DateTime | None, Exception | None]:
"""
Try to follow container logs until container completes.

For a long-running container, sometimes the log read may be interrupted
Such errors of this kind are suppressed.

Returns the last timestamp observed in logs.
"""
# Cosmos implementation difference when compared to proposal to fix the issue in the upstream provider:
# https://github.com/apache/airflow/pull/59372/
if Version(airflow_k8s_provider_version) >= Version("10.10.0"):
from airflow.providers.cncf.kubernetes.utils.pod_manager import parse_log_line
elif (
Version(airflow_k8s_provider_version) >= _K8s_WATCHER_MIN_K8S_PROVIDER_VERSION
): # Successfully tested with Airflow 3.1.0 and K8s provider 10.8.0 and 10.9.0
parse_log_line = self.parse_log_line
else:
raise ValueError(
f"Unsupported K8s provider version: {airflow_k8s_provider_version}. "
f"Minimum required version is {_K8s_WATCHER_MIN_K8S_PROVIDER_VERSION}"
)
# Cosmos custom implementation finishes here.

exception = None
last_captured_timestamp = None
# We timeout connections after 30 minutes because otherwise they can get
# stuck forever. The 30 is somewhat arbitrary.
# As a consequence, a TimeoutError will be raised no more than 30 minutes
# after starting read.
connection_timeout = 60 * 30
# We set a shorter read timeout because that helps reduce *connection* timeouts
# (since the connection will be restarted periodically). And with read timeout,
# we don't need to worry about either duplicate messages or losing messages; we
# can safely resume from a few seconds later
read_timeout = 60 * 5
try:
since_seconds = None
if since_time:
try:
since_seconds = math.ceil((pendulum.now() - since_time).total_seconds())
except TypeError:
self.log.warning(
"Error calculating since_seconds with since_time %s. Using None instead.",
since_time,
)
logs = self.read_pod_logs(
pod=pod,
container_name=container_name,
timestamps=True,
since_seconds=since_seconds,
follow=follow,
post_termination_timeout=post_termination_timeout,
_request_timeout=(connection_timeout, read_timeout),
)
message_to_log = None
message_timestamp = None
progress_callback_lines = []
try:
for raw_line in logs:
line = raw_line.decode("utf-8", errors="backslashreplace")
line_timestamp, message = parse_log_line(line)
if line_timestamp: # detect new log line
if message_to_log is None: # first line in the log
message_to_log = message
message_timestamp = line_timestamp
progress_callback_lines.append(line)
else: # previous log line is complete
for callback in self._callbacks:
callback.progress_callback(
line=message_to_log,
client=self._client,
mode=ExecutionMode.SYNC,
container_name=container_name,
timestamp=message_timestamp,
pod=pod,
)
self._log_message(
message_to_log,
container_name,
container_name_log_prefix_enabled,
log_formatter,
)
last_captured_timestamp = message_timestamp
message_to_log = message
message_timestamp = line_timestamp
progress_callback_lines = [line]
else: # continuation of the previous log line
message_to_log = f"{message_to_log}\n{message}"
progress_callback_lines.append(line)
finally:
# log the last line and update the last_captured_timestamp
if message_to_log is not None:
for callback in self._callbacks:
callback.progress_callback(
line=message_to_log,
client=self._client,
mode=ExecutionMode.SYNC,
container_name=container_name,
timestamp=message_timestamp,
pod=pod,
)
self._log_message(
message_to_log, container_name, container_name_log_prefix_enabled, log_formatter
)
last_captured_timestamp = message_timestamp
except TimeoutError as e:
# in case of timeout, increment return time by 2 seconds to avoid
# duplicate log entries
if val := (last_captured_timestamp or since_time):
return val.add(seconds=2), e
except HTTPError as e:
exception = e
self._http_error_timestamps = getattr(self, "_http_error_timestamps", [])
self._http_error_timestamps = [
t for t in self._http_error_timestamps if t > utcnow() - timedelta(seconds=60)
]
self._http_error_timestamps.append(utcnow())
# Log only if more than 2 errors occurred in the last 60 seconds
if len(self._http_error_timestamps) > 2:
self.log.exception(
"Reading of logs interrupted for container %r; will retry.",
container_name,
)
return last_captured_timestamp or since_time, exception

# note: `read_pod_logs` follows the logs, so we shouldn't necessarily *need* to
# loop as we do here. But in a long-running process we might temporarily lose connectivity.
# So the looping logic is there to let us resume following the logs.
last_log_time = since_time
while True:
last_log_time, exc = consume_logs(since_time=last_log_time)
if not self.container_is_running(pod, container_name=container_name):
return PodLoggingStatus(running=False, last_log_time=last_log_time)
if not follow:
return PodLoggingStatus(running=True, last_log_time=last_log_time)
# a timeout is a normal thing and we ignore it and resume following logs
if not isinstance(exc, TimeoutError):
self.log.warning(
"Pod %s log read interrupted but container %s still running. Logs generated in the last one second might get duplicated.",
pod.metadata.name,
container_name,
)
time.sleep(1)
37 changes: 22 additions & 15 deletions cosmos/airflow/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,17 +202,20 @@ def create_test_task_metadata(
if node:
args_to_override = node.operator_kwargs_to_override

dbt_class = "DbtTest"
watcher_to_test_execution_mode = {
ExecutionMode.WATCHER: ExecutionMode.LOCAL,
ExecutionMode.WATCHER_KUBERNETES: ExecutionMode.KUBERNETES,
}
if (
execution_mode == ExecutionMode.WATCHER
and render_config is not None
render_config is not None
and render_config.test_behavior == TestBehavior.AFTER_ALL
and execution_mode in (ExecutionMode.WATCHER, ExecutionMode.WATCHER_KUBERNETES)
):
operator_class = "cosmos.operators.local.DbtTestLocalOperator"
test_execution_mode = watcher_to_test_execution_mode[execution_mode]
operator_class = calculate_operator_class(execution_mode=test_execution_mode, dbt_class=dbt_class)
else:
operator_class = calculate_operator_class(
execution_mode=execution_mode,
dbt_class="DbtTest",
)
operator_class = calculate_operator_class(execution_mode=execution_mode, dbt_class=dbt_class)

return TaskMetadata(
id=test_task_name,
Expand Down Expand Up @@ -647,6 +650,7 @@ def _add_watcher_producer_task(
tasks_map: dict[str, Any],
task_group: TaskGroup | None,
render_config: RenderConfig | None = None,
execution_mode: ExecutionMode = ExecutionMode.WATCHER,
) -> BaseOperator:
"""
Create the producer task for the watcher execution mode and add it to the tasks_map.
Expand All @@ -665,10 +669,12 @@ def _add_watcher_producer_task(
"resource_type:unit_test",
]

class_name = calculate_operator_class(execution_mode, "DbtProducer")

# First, we create the producer task
producer_task_metadata = TaskMetadata(
id=PRODUCER_WATCHER_TASK_ID,
operator_class="cosmos.operators.watcher.DbtProducerWatcherOperator",
operator_class=class_name,
arguments=producer_task_args,
)
producer_airflow_task = create_airflow_task(producer_task_metadata, dag, task_group=task_group)
Expand All @@ -682,13 +688,17 @@ def _add_watcher_dependencies(
task_args: dict[str, Any],
tasks_map: dict[str, Any],
nodes: dict[str, DbtNode] | None = None,
) -> str:
) -> None:
Comment thread
tatiana marked this conversation as resolved.
"""
Iterate through the watcher consumer tasks and:
- set the producer task ID in all of them
- make the producer task to be the parent of the root dbt nodes, without blocking them from sensing XCom
"""
for node_id, task_or_taskgroup in tasks_map.items():
# We do not want to set a dependency between the producer task and itself
if node_id == PRODUCER_WATCHER_TASK_ID:
continue

node_tasks = (
list(task_or_taskgroup.children.values())
if isinstance(task_or_taskgroup, TaskGroup)
Expand All @@ -701,7 +711,6 @@ def _add_watcher_dependencies(
# We only managed to do this in the case of DbtDag.
# The way it is implemented is by setting the trigger_rule to "always" for the consumer tasks, and by having the producer task with a high priority_weight.
if "DbtDag" in dag.__class__.__name__:

# Is this dbt node a root of the (subset of the) dbt project?
# Note: this may happen in one scenarios:
# - the dbt node not having any `depends_on` within the user-selected `nodes`
Expand All @@ -714,12 +723,9 @@ def _add_watcher_dependencies(
]
else:
always_run_tasks = [task_or_taskgroup]

for task in always_run_tasks:
task.trigger_rule = task_args.get("trigger_rule", "always") # type: ignore[attr-defined]

return producer_airflow_task.task_id


def should_create_detached_nodes(render_config: RenderConfig) -> bool:
"""
Expand Down Expand Up @@ -859,7 +865,7 @@ def build_airflow_graph( # noqa: C901 TODO: https://github.com/astronomer/astro
if execution_mode == ExecutionMode.AIRFLOW_ASYNC:
# This property is only relevant for the setup task, not the other tasks:
virtualenv_dir = task_args.pop("virtualenv_dir", None)
elif execution_mode == ExecutionMode.WATCHER:
elif execution_mode in (ExecutionMode.WATCHER, ExecutionMode.WATCHER_KUBERNETES):
setup_operator_args = getattr(execution_config, "setup_operator_args", None) or {}
# We are intentionally creating the producer task ahead of the consumer tasks
# Airflow priority weight is not being respected in multiple versions of the library, including 3.1
Expand All @@ -870,6 +876,7 @@ def build_airflow_graph( # noqa: C901 TODO: https://github.com/astronomer/astro
tasks_map=tasks_map,
task_group=task_group,
render_config=render_config,
execution_mode=execution_mode,
)

for node_id, node in nodes.items():
Expand Down Expand Up @@ -940,7 +947,7 @@ def build_airflow_graph( # noqa: C901 TODO: https://github.com/astronomer/astro

create_airflow_task_dependencies(nodes, tasks_map)

if execution_mode == ExecutionMode.WATCHER:
if execution_mode in (ExecutionMode.WATCHER, ExecutionMode.WATCHER_KUBERNETES):
setup_operator_args = getattr(execution_config, "setup_operator_args", None) or {}
_add_watcher_dependencies(
dag=dag,
Expand Down
3 changes: 3 additions & 0 deletions cosmos/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ class ExecutionMode(Enum):
VIRTUALENV = "virtualenv"
AZURE_CONTAINER_INSTANCE = "azure_container_instance"
GCP_CLOUD_RUN_JOB = "gcp_cloud_run_job"
WATCHER_KUBERNETES = "watcher_kubernetes"


class InvocationMode(Enum):
Expand Down Expand Up @@ -195,3 +196,5 @@ def _missing_value_(cls, value): # type: ignore
TELEMETRY_TIMEOUT = 1.0

_AIRFLOW3_MAJOR_VERSION = 3

_K8s_WATCHER_MIN_K8S_PROVIDER_VERSION = Version("10.8.0")
Comment thread
tatiana marked this conversation as resolved.
2 changes: 1 addition & 1 deletion cosmos/operators/_watcher/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@ class BaseConsumerSensor(BaseSensorOperator): # type: ignore[misc]
def __init__(
self,
*,
project_dir: str,
profile_config: ProfileConfig | None = None,
project_dir: str | None = None,
profiles_dir: str | None = None,
producer_task_id: str = PRODUCER_WATCHER_TASK_ID,
poke_interval: int = 10,
Expand Down
9 changes: 8 additions & 1 deletion cosmos/operators/_watcher/triggerer.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,14 @@ def _get_xcom_val() -> Any | None:
return await sync_to_async(_get_xcom_val)()

async def get_xcom_val(self, key: str) -> Any | None:
self.log.info(
"Trying to retrieve value using XCom key <%s> by task_id <%s>, dag_id <%s>, run_id <%s> and map_index <%s>",
key,
self.producer_task_id,
self.dag_id,
self.run_id,
self.map_index,
)
if AIRFLOW_VERSION < Version("3.0.0"):
return await self.get_xcom_val_af2(key)
else:
Expand Down Expand Up @@ -141,7 +149,6 @@ async def run(self) -> AsyncIterator[TriggerEvent]:
)
yield TriggerEvent({"status": "failed", "reason": "producer_failed"}) # type: ignore[no-untyped-call]
return

# Sleep briefly before re-polling
await asyncio.sleep(self.poke_interval)
self.log.debug("Polling again for model '%s' status...", self.model_unique_id)
Expand Down
2 changes: 1 addition & 1 deletion cosmos/operators/watcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -216,8 +216,8 @@ class DbtConsumerWatcherSensor(BaseConsumerSensor, DbtRunLocalOperator): # type
def __init__(
self,
*,
project_dir: str,
Comment thread
tatiana marked this conversation as resolved.
profile_config: ProfileConfig | None = None,
project_dir: str | None = None,
profiles_dir: str | None = None,
producer_task_id: str = PRODUCER_WATCHER_TASK_ID,
poke_interval: int = 10,
Expand Down
Loading